<?php
/*
 * 接收客户端的sql,执行sql,结果返回
 * */
class DBServer{

    protected  $server;
    protected  $min; //最小连接数
    protected  $max; //最大连接数
    protected  $idle_pool;//空闲连接池
    protected  $worker_pool;//工作连接池
    protected  $wait_queue;//工作连接池
    protected  $db_config; //配置信息
    public  function  __construct($config)
    {
        //全局对象
        $this->server=new \swoole\server('0.0.0.0',9800);
        $this->server->set($config['tcp']);
        //注册事件
        $this->server->on('WorkerStart',[$this,'onWorkerStart']);
        $this->server->on('receive',[$this,'onReceive']);
        //$this->server->on('close',[$this,'onClose']);
        $this->max=$config['server']['max'];
        $this->min=$config['server']['min'];
        $this->idle_pool=new SplQueue();
       // $this->worker_pool=new SplQueue();
        //$this->wait_queue=new SplQueue();
        $this->db_config=$config;
        //$this->idle_pool->push('1');
        //$this->idle_pool->push('2');
        //echo $this->idle_pool->shift();
        $this->server->start(); //全局生命周期
    }

    public  function  onWorkerStart($server,$worker_id){

        //生成数据库连接
        for ($i=0;$i<$this->min;$i++){
            $pdo=$this->db_connect($this->db_config);
            $this->push_queue('idle_pool',$pdo);
        }

       /*swoole_timer_tick(1000,function (){
                 //var_dump($this->wait_queue);
        });*/



    }

    public  function  onReceive($server,$fd,$reactor_id,$data){
        $len=unpack('N',$data)[1];
        $body=substr($data,-$len);//去除二进制数据之后,不要包头的数据

        //判断有没有空闲连接,等待sql执行
        /*if(count( $this->idle_pool)===0){
            $queueName='wait_queue';
            $this->push_queue($queueName,$body,$fd);
        }*/

        $res=$this->query($fd,$body); //执行一条sql
//        var_dump(count($res));
    }

    protected  function query($fd,$sql){
        $idle_pool=$this->idle_pool->shift();
        $db=$idle_pool['db'];
        //执行sql的时候,两次循环,如果执行成功,就跳出循环失败

        for ($i=0;$i<2;$i++){
            try{
                $res=$db->query($sql);
                break;
            }catch (PDOException $e){
                $err_msg=$e->getMessage();
                //重新链接
                if(strpos($err_msg,'MySQL server has gone away')!==false){
                       $db=$this->db_connect($this->db_config);
                }
                $res=$db->query($sql);
                break;
            }

        }

        //如果当前是select
        if(preg_match('/^select/i',$sql)){
            $data=$res->fetchAll(PDO::FETCH_ASSOC); //执行查询
        }else{
            $data=$res;
        }
        //sleep(2); //同步执行,如果有sql没有执行完
        //执行完毕,重新放回到空闲连接池
        $this->push_queue('idle_pool',$db,$fd);

        return $data;
    }

    //向队列当中添加连接
    public  function  push_queue($queueName,$pdo,$fd=0){
        $arr=[
            'fd'=>$fd,
            'db'=>$pdo
        ];
        $this->$queueName->push($arr);
    }


    /**
     * 数据库长连接
     * @param $db_config
     * @return array|PDO
     */
    public  function  db_connect($db_config){
         try{
             $pdo=new PDO($db_config['db']['dsn'],$db_config['db']['user'],$db_config['db']['password'],[PDO::ATTR_PERSISTENT=>true,PDO::ATTR_SERVER_INFO=>true]);
             return $pdo;
         }catch (Exception $e){
            echo $e->getMessage();
            return [];
         }
    }

}

$config['tcp']=[
    'worker_num'=>1,
    'open_length_check' => true,
    'package_length_type'=>'N',
    'package_length_offset'=>0, //计算总长度
    'package_body_offset'=>4,//包体位置
    ];

$config['server']=[
    'max'=>20,
    'min'=>2
];

$config['db']=[
    'dsn'=>'mysql:host=localhost;dbname=test',
    'user'=>'root',
    'password'=>'Qq!990979940'
];

new DBServer($config);
